// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package cn.org.dbtools.star.stack.control.manager;

import cn.org.dbtools.star.manager.common.util.ServerAndAgentConstant;
import cn.org.dbtools.star.stack.component.SettingComponent;
import cn.org.dbtools.star.stack.component.StarManagerUserSpaceComponent;
import cn.org.dbtools.star.stack.dao.ClusterInfoRepository;
import cn.org.dbtools.star.stack.dao.ClusterInstanceRepository;
import cn.org.dbtools.star.stack.dao.ClusterModuleRepository;
import cn.org.dbtools.star.stack.dao.ClusterModuleServiceRepository;
import cn.org.dbtools.star.stack.dao.ResourceClusterRepository;
import cn.org.dbtools.star.stack.driver.ExecSqlShellClient;
import cn.org.dbtools.star.stack.driver.JdbcSampleClient;
import cn.org.dbtools.star.stack.entity.ClusterInfoEntity;
import cn.org.dbtools.star.stack.entity.ClusterModuleEntity;
import cn.org.dbtools.star.stack.entity.ClusterModuleServiceEntity;
import cn.org.dbtools.star.stack.entity.CoreUserEntity;
import cn.org.dbtools.star.stack.entity.ResourceClusterEntity;
import cn.org.dbtools.star.stack.model.request.control.PMResourceClusterAccessInfo;
import cn.org.dbtools.star.stack.model.request.control.StarClusterCreationReq;
import cn.org.dbtools.star.stack.model.request.control.StarClusterModuleDeployConfig;
import cn.org.dbtools.star.stack.model.request.control.StarClusterModuleResourceConfig;
import cn.org.dbtools.star.stack.model.request.space.ClusterCreateReq;
import cn.org.dbtools.star.stack.model.request.space.ClusterType;
import cn.org.dbtools.star.stack.model.request.space.NewUserSpaceCreateReq;
import cn.org.dbtools.star.stack.service.config.ConfigConstant;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class StarClusterManager {

    @Autowired
    private StarManagerUserSpaceComponent userSpaceComponent;

    @Autowired
    private ClusterInfoRepository clusterRepository;

    @Autowired
    private ClusterInstanceRepository instanceRepository;

    @Autowired
    private ClusterModuleRepository moduleRepository;

    @Autowired
    private ClusterModuleServiceRepository serviceRepository;

    @Autowired
    private ResourceClusterRepository resourceClusterRepository;

    @Autowired
    private ResourceClusterManager resourceClusterManager;

    @Autowired
    private StarClusterModuleManager clusterModuleManager;

    @Autowired
    private JdbcSampleClient jdbcClient;
    @Autowired
    ExecSqlShellClient httpExecSqlClient;

    @Autowired
    private SettingComponent settingComponent;

    // Ensure the data atomicity of creating user space, so add transactions
    @Transactional(rollbackFor = Exception.class)
    public long initOperation(NewUserSpaceCreateReq spaceInfo, String creator) throws Exception {
        return userSpaceComponent.create(spaceInfo, creator);
    }

    // Ensure the atomicity of data in user space, so add transactions
    @Transactional(rollbackFor = Exception.class)
    public void updateClusterOperation(CoreUserEntity user, long clusterId,
                                       NewUserSpaceCreateReq spaceInfo) throws Exception {
        userSpaceComponent.update(user, clusterId, spaceInfo);
    }

    public void createClusterResourceOperation(CoreUserEntity user, ClusterInfoEntity clusterInfoEntity,
                                               PMResourceClusterAccessInfo authInfo,
                                               List<String> hosts) {
        log.info("Create cluster {} resource cluster operation.", clusterInfoEntity.getId());
        long resourceClusterId = clusterInfoEntity.getResourceClusterId();
        if (resourceClusterId < 1L) {
            log.debug("Cluster {} resource cluster not exist, add a new one.", clusterInfoEntity.getId());
            checkUserIdRsaFile(clusterInfoEntity, authInfo, resourceClusterId);
            resourceClusterId = resourceClusterManager.initOperation(user.getId(), authInfo, hosts);
            clusterInfoEntity.setResourceClusterId(resourceClusterId);
            clusterRepository.save(clusterInfoEntity);
        } else {
            log.debug("Cluster {} resource cluster {} already exist, update it.",
                    clusterInfoEntity.getId(), resourceClusterId);
            checkUserIdRsaFile(clusterInfoEntity, authInfo, resourceClusterId);
            resourceClusterManager.updateOperation(resourceClusterId, user.getId(), authInfo, hosts);
        }
    }

    private static void checkUserIdRsaFile(ClusterInfoEntity clusterInfoEntity, PMResourceClusterAccessInfo authInfo, long resourceClusterId) {
        // 检查是否存在用户私钥文件，当前用户的私钥文件存在时，重写sshKey为当前运行用户的私钥文件内容
        String idRsaFilePath = null;
        if (authInfo.getSshKey().startsWith("~")) {
            idRsaFilePath = authInfo.getSshKey().replace("~", FileUtils.getUserDirectoryPath());
        } else if (authInfo.getSshKey().startsWith("/")) {
            idRsaFilePath = authInfo.getSshKey();
        }
        if (idRsaFilePath != null) {
            File sshKeyFile = FileUtils.getFile(FileUtils.getUserDirectoryPath() + "/.ssh/id_rsa");
            if (sshKeyFile.exists()) {
                try {
                    authInfo.setSshKey(FileUtils.readFileToString(sshKeyFile));
                } catch (IOException e) {
                    log.error("Cluster {} resource cluster {} failed to replace sshKey from id_rsa, msg:{} ",
                            clusterInfoEntity.getId(), resourceClusterId, e.getMessage());
                }
            }

        }

    }

    public void configClusterResourceOperation(ClusterInfoEntity clusterInfoEntity, String feImageInfo, String beImageInfo,
                                               String installInfo, String dataDirInfo, int agentPort) {
        log.info("Config cluster {} resource info operation.", clusterInfoEntity.getId());
        clusterInfoEntity.setInstallInfo(installInfo);
        clusterInfoEntity.setDataDirInfo(dataDirInfo);
        clusterRepository.save(clusterInfoEntity);

        resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(), feImageInfo, beImageInfo,
                installInfo, agentPort);
    }

    public void startClusterResourceOperation(ClusterInfoEntity clusterInfoEntity, long requestId) throws Exception {
        log.info("Start cluster {} resource cluster operation.", clusterInfoEntity.getId());
        resourceClusterManager.startOperation(clusterInfoEntity.getResourceClusterId(), requestId);
    }

    public void scheduleClusterOperation(long clusterId, List<StarClusterModuleResourceConfig> resourceConfigs) throws Exception {
        // Step fallback operation
        // If you have done scheduling and allocation before, you need to delete the created data.
        // If not, do nothing directly
        log.info("Schedule cluster {} operation.", clusterId);
        deleteClusterOperation(clusterId);

        // Add broker node installation information, which is available for each node by default
        // broker节点改为从节点中获取
//        StarClusterModuleResourceConfig brokerConfig = new StarClusterModuleResourceConfig();
//        brokerConfig.setModuleName(ServerAndAgentConstant.BROKER_NAME);

//        Set<Long> brokerNodes = new HashSet<>();
        for (StarClusterModuleResourceConfig resourceConfig : resourceConfigs) {
            if (resourceConfig.getModuleName().equals(ServerAndAgentConstant.FE_NAME)) {
                int nodeCount = resourceConfig.getNodeIds().size();
                if (nodeCount % 2 == 0) {
                    log.error("The number {} of Fe cannot be even", nodeCount);
                    throw new Exception("The number of Fe cannot be even");
                }
            }
//            brokerNodes.addAll(resourceConfig.getNodeIds());
            clusterModuleManager.initOperation(clusterId, resourceConfig);
        }
//        brokerConfig.setNodeIds(brokerNodes);
//        clusterModuleManager.initOperation(clusterId, brokerConfig);
    }

    public void configClusterOperation(ClusterInfoEntity clusterInfoEntity, StarClusterCreationReq request) {
        log.info("Config cluster {} operation.", clusterInfoEntity.getId());
        List<StarClusterModuleDeployConfig> deployConfigs = request.getDeployConfigs();
        ResourceClusterEntity resourceClusterEntity =
                resourceClusterRepository.findById(clusterInfoEntity.getResourceClusterId()).get();

        for (StarClusterModuleDeployConfig deployConfig : deployConfigs) {
            deployConfig.setPackageDir(resourceClusterEntity.getRegistryInfo());
            deployConfig.setFePackageDir(resourceClusterEntity.getFeRegistryInfo());
            deployConfig.setBePackageDir(resourceClusterEntity.getBeRegistryInfo());
            deployConfig.setLeaderIPInfo(request.getLeaderIPInfo());
            deployConfig.setDataDirInfo(request.getDataDirInfo());
            clusterModuleManager.configOperation(clusterInfoEntity.getId(), deployConfig);
        }
    }

    public void deployClusterOperation(long clusterId, long requestId) {
        // TODO:Step fallback operation
        log.info("Deploy cluster {} operation.", clusterId);

        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.deployOperation(moduleEntity, requestId);
        }
    }

    /**
     * Operations after creating a new cluster include adding be nodes and broker nodes, and changing passwords
     *
     * @param clusterId
     * @param newPassword
     * @return
     * @throws Exception
     */
    public ClusterCreateReq deployClusterAfterOperation(long clusterId, String newPassword) throws Exception {
        log.info("Deploy cluster {} after operation.", clusterId);
        List<ClusterModuleServiceEntity> serviceEntities = serviceRepository.getByClusterId(clusterId);

        int feJdbcPort = 0;
        int feHttpPort = 0;
        int feEditPort = 0;
        int beHeartPort = 0;
        int brokerRpcPort = 0;
        List<String> feAccessInfo = new ArrayList<>();
        List<String> beAccessInfo = new ArrayList<>();
        List<String> brokerAccessInfo = new ArrayList<>();
        List<String> feObserverInfo = new ArrayList<>();
        for (ClusterModuleServiceEntity serviceEntity : serviceEntities) {
            if (serviceEntity.getName().equals(ServerAndAgentConstant.FE_JDBC_SERVICE)) {
                feJdbcPort = serviceEntity.getPort();
                feAccessInfo = JSON.parseArray(serviceEntity.getAddressInfo(), String.class);
            }
            if (serviceEntity.getName().equals(ServerAndAgentConstant.FE_EDIT_SERVICE)) {
                feEditPort = serviceEntity.getPort();
                feObserverInfo = JSON.parseArray(serviceEntity.getAddressInfo(), String.class);
            }
            if (serviceEntity.getName().equals(ServerAndAgentConstant.FE_HTTP_SERVICE)) {
                feHttpPort = serviceEntity.getPort();
            }
            if (serviceEntity.getName().equals(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE)) {
                beHeartPort = serviceEntity.getPort();
                beAccessInfo = JSON.parseArray(serviceEntity.getAddressInfo(), String.class);
            }
            if (serviceEntity.getName().equals(ServerAndAgentConstant.BROKER_PRC_SERVICE)) {
                brokerRpcPort = serviceEntity.getPort();
                brokerAccessInfo = JSON.parseArray(serviceEntity.getAddressInfo(), String.class);
            }
        }

        log.debug("start init cluster");
        // get star jdbc connection
        String feHost = feAccessInfo.get(0);
        // 组织链接参数
        Map<String, String> connectProperties = new HashMap<>();
        connectProperties.put("host", feHost);
        connectProperties.put("port", String.valueOf(feJdbcPort));
        connectProperties.put("username", ServerAndAgentConstant.USER_ROOT);
        connectProperties.put("password", null);

//        // add fe Observer
//        log.debug("Add fe Observers {}", feObserverInfo);
//        List<String> feObserverHostsPorts = new ArrayList<>();
//        for (String feObserverHost : feObserverInfo) {
//            String feObserverHostsPort = feObserverHost + ":" + feEditPort;
//            feObserverHostsPorts.add(feObserverHostsPort);
//        }
//        httpExecSqlClient.addFeObserver(feObserverHostsPorts, connectProperties);

//        // add be
//        log.debug("Add be {}", beAccessInfo);
//        List<String> beHostsPorts = new ArrayList<>();
//        for (String beHost : beAccessInfo) {
//            String beHostsPort = beHost + ":" + beHeartPort;
//            beHostsPorts.add(beHostsPort);
//        }
//        httpExecSqlClient.addBe(beHostsPorts, connectProperties);
//
//        // add broker
//        log.debug("Add broker {}", brokerAccessInfo);
//        List<String> brokerHostsPorts = new ArrayList<>();
//        for (String brokerHost : brokerAccessInfo) {
//            String brokerHostsPort = brokerHost + ":" + brokerRpcPort;
//            brokerHostsPorts.add(brokerHostsPort);
//        }
//        httpExecSqlClient.addBrokerName(brokerHostsPorts, connectProperties);

        // update password
//        log.debug("Update star root and admin user default password.");
        log.debug("Update star root user default password.");
        // starrocks集群中没有admin用户
//        jdbcClient.updateUserPassword(ServerAndAgentConstant.USER_ADMIN, newPassword, stmt);
        httpExecSqlClient.updateUserPassword(ServerAndAgentConstant.USER_ROOT, newPassword, connectProperties);

        // get cluster fe access info
        ClusterCreateReq clusterAccessInfo = new ClusterCreateReq();
        clusterAccessInfo.setPasswd(newPassword);
        clusterAccessInfo.setAddress(feHost);
        clusterAccessInfo.setUser(ServerAndAgentConstant.USER_ROOT);
        clusterAccessInfo.setQueryPort(feJdbcPort);
        clusterAccessInfo.setHttpPort(feHttpPort);
        clusterAccessInfo.setType(ClusterType.Star);
        return clusterAccessInfo;
    }

    public void clusterAccessOperation(long clusterId, ClusterCreateReq clusterAccessInfo) throws Exception {
        log.info("Access cluster {} operation.", clusterId);
        ClusterInfoEntity clusterInfo = clusterRepository.findById(clusterId).get();
        userSpaceComponent.clusterAccess(clusterAccessInfo, clusterInfo);
    }

    public void checkClusterDeployOperation(long clusterId, long requestId) throws Exception {
        log.info("Check cluster {} deploy operation.", clusterId);
        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.checkDeployOperation(moduleEntity, requestId);
        }
    }

    public void checkClusterInstancesOperation(long clusterId) throws Exception {
        log.info("Check cluster {} instances operation.", clusterId);
        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.checkInstancesOperation(moduleEntity);
        }

        //插入集群配置选项，开启监控数据获取来源为prometheus 先删后插
        if (settingComponent.readAdminSetting(clusterId, ConfigConstant.MONITOR_SOURCE) != null) {
            settingComponent.deleteAdminSetting(clusterId, ConfigConstant.MONITOR_SOURCE);
        }
        settingComponent.addNewAdminSetting(clusterId, ConfigConstant.MONITOR_SOURCE,
                ConfigConstant.MONITOR_SRC_PROMETHEUS);
    }

    public void stopClusterOperation(long clusterId, long requestId) throws Exception {
        log.info("Stop cluster {} instances operation.", clusterId);
        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.stopOperation(moduleEntity, requestId);
        }
    }

    public void startClusterOperation(long clusterId, long requestId) throws Exception {
        log.info("Start cluster {} instances operation.", clusterId);
        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.startOperation(moduleEntity, requestId);
        }
    }

    public void reStartClusterOperation(long clusterId, long requestId) throws Exception {
        log.info("Restart cluster {} instances operation.", clusterId);
        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.restartOperation(moduleEntity, requestId);
        }
    }

    public void deleteClusterOperation(ClusterInfoEntity clusterInfo) throws Exception {
        long clusterId = clusterInfo.getId();
        log.info("Delete {} cluster {} instances operation.", clusterId, clusterInfo.getName());
        deleteClusterOperation(clusterId);
    }

    private void deleteClusterOperation(long clusterId) throws Exception {
        List<ClusterModuleEntity> moduleEntities = moduleRepository.getByClusterId(clusterId);

        for (ClusterModuleEntity moduleEntity : moduleEntities) {
            clusterModuleManager.deleteOperation(moduleEntity);
        }

        // 删除集群的同时，删除集群配置选项
        if (settingComponent.readAdminSetting(clusterId, ConfigConstant.MONITOR_SOURCE) != null) {
            settingComponent.deleteAdminSetting(clusterId, ConfigConstant.MONITOR_SOURCE);
        }
    }
}
